源码|HDFS之DataNode:写数据块(1)、源码|HDFS之DataNode:写数据块(2)分别分析了无管道无异常、管道写无异常的情况下,datanode上的写数据块过程。本文分析管道写有异常的情况,假设副本系数3(即写数据块涉及1个客户端+3个datanode),讨论datanode对不同异常种类、不同异常时机的处理。
源码版本:Apache Hadoop 2.6.0
结论与实现都相对简单。可仅阅读总览。
开始之前
总览
datanode对写数据块过程中的异常处理比较简单,通常采用两种策略:
- 当前节点抛异常,关闭上下游的IO流、socket等,以关闭管道。
- 向上游节点发送携带故障信息的ack。
只有少部分情况采用方案2;大部分情况采用方案1,简单关闭管道了事;部分情况二者结合。
虽然异常处理策略简单,但涉及异常处理的代码却不少,整体思路参照源码|HDFS之DataNode:写数据块(1)主流程中的DataXceiver#writeBlock()方法,部分融合了源码|HDFS之DataNode:写数据块(2)中管道写的内容 。本文从主流程DataXceiver#writeBlock()入手,部分涉及DataXceiver#writeBlock()的外层方法。
更值得关注的是写数据块的故障恢复流程,该工作由客户端主导,猴子将在对客户端的分析中讨论。
文章的组织结构
- 如果只涉及单个分支的分析,则放在同一节。
- 如果涉及多个分支的分析,则在下一级分多个节,每节讨论一个分支。
- 多线程的分析同多分支。
- 每一个分支和线程的组织结构遵循规则1-3。
主流程:DataXceiver#writeBlock()
DataXceiver#writeBlock():
|
|
最后的finally块对异常处理至关重要:
正常情况不表。对于异常情况,关闭所有到下游的IO流(mirrorOut、mirrorIn)、socket(mirrorSock),关闭到上游的输出流(replyOut),关闭blockReceiver内部封装的大部分资源(通过BlockReceiver#close()完成),剩余资源如到上游的输入流(in)由外层的DataXceiver#run()中的finally块关闭。
replyOut只是一个过滤器流,其包装的底层输出流也可以由DataXceiver#run()中的finally块关闭。限于篇幅,本文不展开。
记住此处finally块的作用,后面将多次重复该处代码,构成总览中的方案1。
下面以三个关键过程为例,分析这三个关键过程中的异常处理,及其与外层异常处理逻辑的交互。
本地准备:BlockReceiver.<init>()
根据前文的分析,BlockReceiver.<init>()
的主要工作比较简单:在rbw目录下创建block文件和meta文件:
|
|
特别提一下DataNode#checkDiskErrorAsync(),该方法异步检查是否有磁盘错误,如果错误磁盘超过阈值,就关闭datanode。但阈值的计算猴子还没有看懂,看起来是对DataStorage的理解有问题。
BlockReceiver#close()的工作已经介绍过了。需要关注的是对ReplicaAlreadyExistsException与其他IOException的处理:重新抛出。
ReplicaAlreadyExistsException是IOException的子类,由FsDatasetImpl#createRbw()抛出。
至于抛出IOException的情况就太多了,无权限、磁盘错误等非常原因。
重新抛出这些异常块会怎样呢?触发外层DataXceiver#writeBlock()中的catch块与finally块。
由于至今还没有建立下游管道,先让我们看看由于异常执行finally块,对上游节点产生的恶果:
- 在DataXceiver线程启动后,DataXceiver#peer中封装了当前节点到上游节点的输出流(out)与上游节点到当前节点的输入流(in)。
- 这些IO流的本质是socket,关闭当前节点端的socket后,上游节点端的socket也会在一段时间后触发超时关闭,并抛出SocketException(IOException的子类)。
- 上游节点由于socket关闭捕获到了IOException,于是也执行finally块,重复一遍当前节点的流程。
如此,逐级关闭上游节点的管道,直到客户端对管道关闭的异常作出处理。
如果在创建block文件或meta文件时抛出了异常,目前没有策略及时清理rbw目录下的“无主”数据块。读者可尝试debug执行
BlockReceiver.<init>()
,在rbw目录下创建数据块后长时间不让线程继续执行,最终管道超时关闭,但rbw目录下的文件依然存在。不过数据块恢复过程可完成清理工作,此处不展开。
建立管道:if (targets.length > 0) {
代码块
如果本地准备没有发生异常,则开始建立管道:
|
|
根据前文对管道建立过程的分析,此处要创建到与下游节点间的部分IO流、socket。
建立资源、发送管道建立请求的过程中都有可能发生故障,抛出IOException及其子类。catch块处理这些IOException的逻辑采用了方案2:先向上游节点发送ack告知ERROR,然后关闭到下游节点的IO流(mirrorOut、mirrorIn)、关闭到下游的socket(mirrorSock)。最后,重新抛出异常,以触发外层的finally块。
此处执行的清理是外层finally块的子集,重点是多发送了一个ack,对该ack的处理留到PacketResponder线程的分析中。
不过,此时已经开始建立下游管道,再来看看由于异常执行catch块(外层finally块的分析见上),对下游节点产生的恶果:
- 初始化mirrorOut、mirrorIn、mirrorSock后,下游节点也通过DataXceiverServer建立了配套的IO流、socket等。
- 这些IO流的本质是socket,关闭当前节点端的socket后,下游节点端的socket也会在一段时间后触发超时关闭,并抛出SocketException(IOException的子类)。
- 下游节点由于socket关闭捕获到了IOException,于是也执行此处的catch块或外层的finally块,重复一遍当前节点的流程。
如此,逐级关闭下游节点的管道,直到客户端对管道关闭的异常作出处理。同时,由于最终会执行外层finally块,则也会逐级关闭上游节点的管道。
IO流mirrorOut、mirrorIn实际上共享TCP套接字mirrorSock;in、out同理。但关闭IO流时,除了底层socket,还要清理缓冲区等资源,因此,将它们分别列出是合理的。
管道写:BlockReceiver#receiveBlock()
根据前文的分析,如果管道成功建立,则BlockReceiver#receiveBlock()开始接收packet并响应ack:
|
|
仍旧分接收packet与响应ack两部分讨论。
同步接收packet:BlockReceiver#receivePacket()
根据前文的分析,BlockReceiver#receivePacket()负责接收上游的packet,并继续向下游节点管道写:
|
|
对管道写过程的分析要分尾节点与中间节点两种情况展开:
- 如果是尾节点,则持久化之前,要先对收到的packet做一次校验(使用packet本身的校验机制)。如果校验失败,则委托PacketResponder线程发送ERROR_CHECKSUM状态的ack,并再次抛出IOE。
- 如果是中间节点,则只需要向下游镜像写packet。假设在非中断的情况下发生异常,则仅仅标记
mirrorError = true
。这造成两个影响:- 走方案1:后续包都不会再写往下游节点,最终socket超时关闭,并逐级关闭上下游管道。
- 走方案2:上游将通过ack得知下游发生了错误(见后)。
尾节点异常的处理还是走方案1,中间节点同时走方案1与方案2。
异步发送ack:PacketResponder线程
根据前文的分析,PacketResponder线程负责接收下游节点的ack,并继续向上游管道响应:
|
|
对于OOB,还要关注PipelineAck#getOOBStatus():
|
|
与之前的分支相比,PacketResponder线程大量使用中断来代替抛异常使线程终止。除此之外,关于OOB状态与ERROR_CHECKSUM状态的处理有些特殊:
- OOB状态:将第一个OOB节点的状态,传递到客户端。OOB是由datanode重启引起的,因此,第一个OOB节点在发送OOB的ack后,就不会再发送其他ack,最终由于引起socket超时引起整个管道的关闭。
ERROR_CHECKSUM
状态:只有尾节点可能发出ERROR_CHECKSUM
状态的ack,发送后抛出IOE主动关闭PacketResponder线程;然后上游节点收到ERROR_CHECKSUM
状态的ack后,也将抛出IOE关闭PacketResponder线程,但不再发送ack;如果还有上游节点,将因为长期收不到ack,socket超时关闭。最终关闭整个管道。
需要注意的,OOB通常能保证传递到客户端;但尾节点发送的ERROR_CHECKSUM
无法保证被上游节点发现(先发ack再抛IOE只是一种努力,不过通常能保证),如果多于两个备份,则一定不会被客户端发现。
猴子没明白为什么此处要使用中断使线程终止。
总结
尽管总览中列出了两种方案,但可以看到,方案1是异常处理的主要方式:抛异常关socket,然后逐级导致管道关闭。
关闭管道后,由客户端决定后续处理,如数据块恢复等。